Integrate MetadataPayloadExchange into Shuffler#649
Integrate MetadataPayloadExchange into Shuffler#649pentschev wants to merge 3 commits intorapidsai:mainfrom
MetadataPayloadExchange into Shuffler#649Conversation
madsbk
left a comment
There was a problem hiding this comment.
Overall looks great, @pentschev.
It would be good to run some benchmarks. I assume we expect the performance to remain unchanged?
| size, br_->stream_pool().get_stream(), br_ | ||
| ); | ||
| }, | ||
| statistics |
There was a problem hiding this comment.
statistics might have been moved in line 349. Change line 349 to statistics_{statistics).
There was a problem hiding this comment.
But statistics is a shared_ptr and that only gets moved after we initialized MetadataPayloadExchange, by that time MetadataPayloadExchange has already a copy of the shared_ptr and increase its refcount. I think this is not necessary.
There was a problem hiding this comment.
No, the order in which members are initialized in a constructor is guaranteed (it follows their declaration order in the class), but the evaluation order of the arguments passed to those members is not. Therefore, std::move(statistics) might be evaluated first.
pentschev
left a comment
There was a problem hiding this comment.
Thanks @madsbk , I ran bench_shuffle with 4 workers (I don't currently have exclusive access to an 8xH100 machine) as you suggested, and below are some results:
Before (run 1)
[PRINT:0:0] means: 0.01 sec | local throughput: 134.29 GiB/s | global throughput: 537.17 GiB/s | in_parts: 1 | out_parts: 2 | nranks: 4
[PRINT:1:0] means: 0.01 sec | local throughput: 128.22 GiB/s | global throughput: 512.87 GiB/s | in_parts: 1 | out_parts: 2 | nranks: 4
[PRINT:3:0] means: 0.01 sec | local throughput: 127.86 GiB/s | global throughput: 511.45 GiB/s | in_parts: 1 | out_parts: 2 | nranks: 4
[PRINT:2:0] means: 0.01 sec | local throughput: 124.80 GiB/s | global throughput: 499.18 GiB/s | in_parts: 1 | out_parts: 2 | nranks: 4
After (run 1)
[PRINT:2:0] means: 0.01 sec | local throughput: 132.01 GiB/s | global throughput: 528.06 GiB/s | in_parts: 1 | out_parts: 2 | nranks: 4
[PRINT:1:0] means: 0.01 sec | local throughput: 127.69 GiB/s | global throughput: 510.74 GiB/s | in_parts: 1 | out_parts: 2 | nranks: 4
[PRINT:0:0] means: 0.01 sec | local throughput: 129.64 GiB/s | global throughput: 518.55 GiB/s | in_parts: 1 | out_parts: 2 | nranks: 4
[PRINT:3:0] means: 0.01 sec | local throughput: 128.10 GiB/s | global throughput: 512.41 GiB/s | in_parts: 1 | out_parts: 2 | nranks: 4
As you may notice, there's some variability that makes it difficult to draw a line on whether performance is better or worse, but I think it's fair to say it has no regressions, the minimum throughput is generally higher, but maximum is lower. If we run multiple times we may see a different picture, in the results below we see both minimum and maximum throughput higher than before.
Before (run 2)
[PRINT:1:0] means: 0.01 sec | local throughput: 131.85 GiB/s | global throughput: 527.39 GiB/s | in_parts: 1 | out_parts: 2 | nranks: 4
[PRINT:0:0] means: 0.01 sec | local throughput: 131.35 GiB/s | global throughput: 525.39 GiB/s | in_parts: 1 | out_parts: 2 | nranks: 4
[PRINT:2:0] means: 0.01 sec | local throughput: 124.80 GiB/s | global throughput: 499.19 GiB/s | in_parts: 1 | out_parts: 2 | nranks: 4
[PRINT:3:0] means: 0.01 sec | local throughput: 124.88 GiB/s | global throughput: 499.53 GiB/s | in_parts: 1 | out_parts: 2 | nranks: 4
After (run 2)
[PRINT:1:0] means: 0.01 sec | local throughput: 129.12 GiB/s | global throughput: 516.48 GiB/s | in_parts: 1 | out_parts: 2 | nranks: 4
[PRINT:0:0] means: 0.01 sec | local throughput: 129.59 GiB/s | global throughput: 518.36 GiB/s | in_parts: 1 | out_parts: 2 | nranks: 4
[PRINT:2:0] means: 0.01 sec | local throughput: 134.02 GiB/s | global throughput: 536.08 GiB/s | in_parts: 1 | out_parts: 2 | nranks: 4
[PRINT:3:0] means: 0.01 sec | local throughput: 129.40 GiB/s | global throughput: 517.61 GiB/s | in_parts: 1 | out_parts: 2 | nranks: 4
With that said, I propose we get this merged when we're happy with the way the code looks and tests pass and observe performance over the next few days in our nightly runs to ensure there were at least no regressions. WDYT?
| size, br_->stream_pool().get_stream(), br_ | ||
| ); | ||
| }, | ||
| statistics |
There was a problem hiding this comment.
But statistics is a shared_ptr and that only gets moved after we initialized MetadataPayloadExchange, by that time MetadataPayloadExchange has already a copy of the shared_ptr and increase its refcount. I think this is not necessary.
Follow-up on #437 by switching
Shufflerto use the newMetadataPayloadExchange. This helps simplify the code inShuffler, and simultaneously remove the need for an "ack" message, opening also the possiblity for using an Active Message-based communicator (TBD).